
In this here second part of the "Timestamped" series, we introduce its namesake Timestamped interface, and use a Deque from Java6 to collect such things, and impose a time-based capacity for that. 

<!--break-->

Ultimately we gonna hook up a remote Log4j appender, to digest logs in order to provide salient information via RMX and HTTP, and notify ourselves via Gtalk when the wheels are wobbling or even coming off altogether. <i>What fun!</i> But lemme not get ahead of myself.

<h3>Prequels</h3>

Incidently we kicked off this no-hit wonder series with <a href="http://weblogs.java.net/blog/evanx/archive/2012/06/07/counter-map">Counter Map</a>, which does not feature further yet, so ignore that for now.

<p>   
<a style='text-decoration: none;' href="https://code.google.com/p/vellum/wiki/home">
<div style="border-bottom: solid 1px; color: #ffffff; background-color: #aaaaaa;">
<img src="http://upload.wikimedia.org/wikipedia/commons/1/12/Gnome-appointment-new.svg" border="0" align="left" hspace="0"/><span style="font-style: normal; font-weight: bold; font-size: 12pt;">&nbsp;Timestamped Deque:</span><span style="font-style: italic; font-weight: normal; font-size: 11pt;">&nbsp;A part of "Timestamped: a trilogy in a few parts."</span>
</div>
</a>

Without further ado, I give you the namesake interface of this series.
<pre>
public interface Timestamped {
    public long getTimestamp();    
}
</pre>
which returns the timestamp in "millis" ala <tt>System.currentTimeMillis()</tt>.

Also take an adapter for Log4j's <tt>LoggingEvent</tt>. 
<pre>
public class TimestampedLoggingEventAdapter implements Timestamped {
    LoggingEvent loggingEvent;

    public TimestampedLoggingEventAdapter(LoggingEvent loggingEvent) {
        this.loggingEvent = loggingEvent;
    }

    @Override
    public long getTimestamp() {
        return loggingEvent.getTimeStamp();
    }
}
</pre>
And a generic wrapped element.
<pre>
public class TimestampedElement<T> implements Timestamped, Comparable<Timestamped> {
    T element;
    long timestamp;

    public TimestampedElement(T element, long timestamp) {
        this.element = element;
        this.timestamp = timestamp;
    }

    public T getElement() {
        return element;
    }
    
    @Override
    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public int compareTo(Timestamped other) {
        if (timestamp < other.getTimestamp()) return -1;
        if (timestamp > other.getTimestamp()) return 1;
        else return 0;
    }    
}
</pre>

where we implement <tt>compareTo()</tt> for <a href="http://docs.oracle.com/javase/6/docs/api/java/lang/Comparable.html">natural ordering</a> by the timestamp. 

Since duplicate timestamps are possible i.e. where two or more events occur at the same millisecond, and indeed duplicate log messages at the same time, we forgo implementing <tt>hashCode()</tt> and <tt>equals()</tt>. Imagine we add such elements to a <a href="http://docs.oracle.com/javase/6/docs/api/java/util/Set.html"><tt>Set</tt></a>, whose javadoc describes it thus:
<blockquote>
    <tt>Set</tt> - A collection that contains no duplicate elements. More formally, sets contain no pair of elements e1 and e2 such that e1.equals(e)
</blockquote>
Therefore we defer to the default <tt>hashCode()</tt> and <tt>equals()</tt> from <tt>Object</tt>, which are based on object address reference.

We might construct a <a href="http://docs.oracle.com/javase/6/docs/api/java/util/SortedSet.html"><tt>SortedSet</tt></a> of <tt>Timestamped</tt> elements.
<blockquote>
    <tt>SortedSet</tt> - The elements are ordered using their natural ordering, or by a Comparator typically provided at sorted set creation time.
</blockquote>

So if we have not implemented <tt>compareTo()</tt>, then we will need a comparator.
<pre>
public class TimestampedComparator implements Comparator<Timestamped> {

    @Override
    public int compare(Timestamped o1, Timestamped o2) {
        if (o1.getTimestamp() < o2.getTimestamp()) return -1;
        if (o1.getTimestamp() > o2.getTimestamp()) return 1;
        else return 0;
    }    
}
</pre>

<img src="http://jroller.com/evanx/resource/bicycle-ghost-deck-white-256.jpg" align="right" hspace="16"/>
Our inclination might be collect <tt>Timestamped</tt> elements in a <a href="http://docs.oracle.com/javase/6/docs/api/java/util/List.html"><tt>List</tt></a>, or a <a href="http://docs.oracle.com/javase/6/docs/api/java/util/Queue.html"><tt>Queue</tt></a> perhaps.
<blockquote>
    <tt>Queue</tt> - A collection designed for holding elements prior to processing.
</blockquote>
That sounds rather appropriate for our digestive purposes, to find the ghost in the machine. 

<h2>Deque collector</h2>

So let's introduce the namesake of this article, a collector of timestamped thingies - a <a href="http://en.wikipedia.org/wiki/Circular_buffer">circular buffer</a>, some might call it - and impose a time-based capacity thereupon.

So we use the <a href="http://docs.oracle.com/javase/6/docs/api/java/util/Deque.html">java.util.Deque</a> found in Java 1.6, courtesy of those most excellent gentlemen, Doug Lea and Josh Bloch. Its javadoc describes it thus:
<blockquote>
    <tt>Deque</tt> - A linear collection that supports element insertion and removal at both ends. The name deque is short for "double ended queue" and is usually pronounced "deck."
</blockquote>
We use the efficient <a href="http://docs.oracle.com/javase/6/docs/api/java/util/ArrayDeque.html"><tt>ArrayDeque</tt></a> implementation.
<blockquote>
<tt>ArrayDeque</tt> - This class is likely to be faster than Stack when used as a stack, and faster than LinkedList when used as a queue.
</blockquote>
Fantastic. Let's get us some of that.
<pre>
public class TimestampedDequer<T extends Timestamped>  {
    long capacityMillis;
    long lastTimestamp;
    ArrayDeque<T> deque = new ArrayDeque();
    
    public TimestampedDequer(long capacityMillis) {
        this.capacityMillis = capacityMillis;
    }
    
    public synchronized void addLast(T element) {
        if (element.getTimestamp() == 0 || element.getTimestamp() < lastTimestamp) {
            deque.clear(); // throw our toys out the cot exception
        } else {
            lastTimestamp = element.getTimestamp();
            prune(lastTimestamp);
            deque.addLast(element);
        }
    }

    private void prune(long lastTimestamp) {
        while (deque.size() > 0 && 
                deque.getFirst().getTimestamp() <= lastTimestamp - capacityMillis) {
            deque.removeFirst();
        }
    }
</pre>

<img src="http://jroller.com/evanx/resource/arcane-deck-white-back-300.jpg" align="left" hspace="16"/>
where we compose an <tt>ArrayDeque</tt> and <tt>synchronize</tt> it "externally" for our purposes, considering that it will be digesting log records continually, whilst being under interrogation by RMX, HTTP requests and what-not. 

When we add an element onto the tail, we <tt>prune()</tt> expired elements from the head. Observe that the above implementation assumes that elements are added in chronological order. However we expect our host's time to be adjusted by <tt>NTP</tt> occassionally - hence we <tt>clear()</tt> i.e. when we encounter an eventuality that we don't play nicely with.

If we are digesting logs from multiple servers or what-have-you, the above so-called "dequer" aint gonna work, baby - it's gonna come up empty, baby. Don't shuffle this deque, baby. <i>(As Elvis might have said.)</i> We'll deal with such a handful another time.

Now, in order to analyse the contents for the desired interval, we take a <tt>snapshot()</tt>.
<pre>
    public synchronized Deque<T> snapshot(long lastTimestamp) {
        prune(lastTimestamp);
        return deque.clone();
    }
</pre>
which returns a defensive copy, and so is a relatively costly operation. Perhaps you could recommend an alternative strategy? Perhaps we could implement a special concurrent deque implementation in a future episode, as an exercise? Taking inspiration from that <a href="http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html">Disruptor</a> thingymajig, perchance, as well as <tt>ArrayDeque</tt> itself? One that supports aggregating from multiple servers, methinks.

Another use-case is to get the tail i.e. the latest so-many elements, for informational purposes e.g. to display via a servlet, or attach to an email alert.
<pre>
    public synchronized Deque<T> tail(int size) {
        Deque tail = new ArrayDeque();
        Iterator<T> it = deque.descendingIterator();
        for (int i = 0; i < size && it.hasNext(); i++) {
            tail.addFirst(it.next());
        }
        return tail;
    }    
</pre>
where we use <tt>descendingIterator()</tt> to read from the tail of the deque, and <tt>addFirst()</tt> to rectify the order.

Let's test this thing. 
<pre>
public class TimestampedDequerTest  {
    TimestampedDequer<TimestampedElement> dequer = new TimestampedDequer(capacityMillis);
    boolean verbose = false;
    
    private void addLast() {
        long timestamp = System.currentTimeMillis();
        String value = "record at " + timestamp;
        dequer.addLast(new TimestampedElement(value, timestamp));
        if (verbose) {
            System.out.println(value);
        }
    }

    @Test
    public void test() throws Exception {
        check();
        check();
    }
</pre>
where we <tt>check()</tt> twice... just to make sure. Of <tt>prune()</tt>, that is. 
<pre>
    private void check() throws Exception {
        Thread.sleep(capacityMillis);
        Assert.assertEquals(0, dequer.snapshot(System.currentTimeMillis()).size());
        Assert.assertEquals(0, dequer.tail(4).size());
        addLast();
        Assert.assertEquals(1, dequer.tail(4).size());
        Assert.assertEquals(1, dequer.snapshot(System.currentTimeMillis()).size());
        Thread.sleep(capacityMillis/2);
        Assert.assertEquals(1, dequer.snapshot(System.currentTimeMillis()).size());
        Assert.assertEquals(1, dequer.tail(4).size());
        addLast();
        Assert.assertEquals(2, dequer.tail(4).size());
        Assert.assertEquals(2, dequer.snapshot(System.currentTimeMillis()).size());
        Thread.sleep(capacityMillis/2);
        Assert.assertEquals(2, dequer.tail(4).size());
        Assert.assertEquals(1, dequer.snapshot(System.currentTimeMillis()).size());
        Assert.assertEquals(1, dequer.tail(4).size());
    }
</pre>
<img src="http://jroller.com/evanx/resource/bicycle-guardians-deck-back-crop.jpg" align="right" hspace="16"/>
where expect the final <tt>snapshot()</tt> to loose an element to <tt>prune()</tt>'ing, given the two half <tt>capacityMillis</tt> sleeps since the first <tt>addList()</tt>.

Considering that the purpose of this <tt>Timestamped</tt> series is reducing information overload, we'll tail off here for now, and leave the "heavy-dropping" for next week, namely, testing with threads.

Thereafter, we'll see about using our <tt>TimestampedDequer</tt> to analyse the latest <tt>deque</tt> of logs e.g. every minute, to detect when our app might be coming down like a house of cards.

<h3>Resources</h3>

https://code.google.com/p/vellum/ - where i will collate these articles and their code.

<h3>Credits</h3>

Thanks to my colleague Zach Visagie at BizSwitch.net, for his kind reviews and indispensible input!

